package com.flink.carmonitor;


import com.flink.dto.CardSpeedDTO;
import com.flink.dto.MonitorSpeedLimitInfo;
import com.flink.dto.OverSpeedCarInfo;
import com.flink.utils.JdbcSink;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.util.Collector;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.Date;
import java.util.Properties;
import java.util.concurrent.TimeUnit;

/***
 * 统计车辆超速 百分之二十的车辆
 */
public class OverSpeedCardMonitor {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //读取kafka中数据

        //从kafka中读取数据
        Properties properties = new Properties();
        properties.setProperty("group.id", "test");
        properties.setProperty("bootstrap.servers", "172.16.12.148:9092,172.16.12.149:9092");
        properties.setProperty("auto.offset.reset", "latest");
        properties.setProperty("key-serializer", "org.apache.kafka.common.serialization.Deserializer");
        properties.setProperty("value-serializer", "org.apache.kafka.common.serialization.Deserializer");
       // DataStream<String> ds1 = env.addSource(new FlinkKafkaConsumer<String>("monitor_speed", new SimpleStringSchema(), properties));

        DataStreamSource<String> ds1 = env.socketTextStream("172.16.10.45", 7777);

        //读取车辆监控信息
        DataStream<CardSpeedDTO> monitoryDs = ds1.map(new MapFunction<String, CardSpeedDTO>() {
            @Override
            public CardSpeedDTO map(String value) throws Exception {
                String[] split = value.split("#");
                CardSpeedDTO dto = new CardSpeedDTO();
                dto.setCardNo(split[0].trim());
                dto.setAreaId(split[1].trim());
                dto.setRoadId(split[2].trim());
                dto.setMonitorId(split[3].trim());
                dto.setCameraId(split[4].trim());
                dto.setActionTime(Long.valueOf(split[5].trim()));
                dto.setSpeed(Double.valueOf(split[6].trim()));
                return dto;
            }
        }, TypeInformation.of(CardSpeedDTO.class));
        //读取区域-道路-卡口-限速信息  读取Mysql数据 每隔半个小时读取一次
        DataStream<MonitorSpeedLimitInfo> monitorSpeedLimitInfoDataStream = env.addSource(new RichSourceFunction<MonitorSpeedLimitInfo>() {
            Connection connection;
            PreparedStatement pst;
            volatile boolean stop = false;

            //调用一次 初始化连接对象
            @Override
            public void open(Configuration parameters) throws Exception {
                connection = DriverManager.getConnection("jdbc:mysql://101.35.91.29:3399/flink-card", "root", "123456");
                pst = connection.prepareStatement("select `area_id`, `road_id`, `monitor_id`, `speed_limit` from t_monitor_speed_limit_info");
                super.open(parameters);
            }

            //产生数据的方法，
            @Override
            public void run(SourceContext<MonitorSpeedLimitInfo> ctx) throws Exception {
                while (!stop) {
                    ResultSet resultSet = pst.executeQuery();
                    while (resultSet.next()) {
                        String areaId = resultSet.getString("area_id");
                        String roadId = resultSet.getString("road_id");
                        String monitorId = resultSet.getString("monitor_id");
                        Double speedLimit = resultSet.getDouble("speed_limit");
                        System.out.println(speedLimit);
                        ctx.collect(new MonitorSpeedLimitInfo(areaId, roadId, monitorId, speedLimit));
                    }
                    TimeUnit.MINUTES.sleep(30);
                }
            }

            @Override
            public void cancel() {
                //当程序取消时，调用该方法
                stop = true;
            }
        }, TypeInformation.of(MonitorSpeedLimitInfo.class));

        MapStateDescriptor<String, Double> stateDescriptor = new MapStateDescriptor<>("map-state", String.class, Double.class);
        //得到广播流
        BroadcastStream<MonitorSpeedLimitInfo> broadcast = monitorSpeedLimitInfoDataStream.broadcast(stateDescriptor);


        BroadcastConnectedStream<CardSpeedDTO, MonitorSpeedLimitInfo> connect = monitoryDs.connect(broadcast);

        SingleOutputStreamOperator<OverSpeedCarInfo> process = connect.process(new BroadcastProcessFunction<CardSpeedDTO, MonitorSpeedLimitInfo, OverSpeedCarInfo>() {
            @Override
            public void processElement(CardSpeedDTO value, BroadcastProcessFunction<CardSpeedDTO, MonitorSpeedLimitInfo, OverSpeedCarInfo>.ReadOnlyContext ctx, Collector<OverSpeedCarInfo> out) throws Exception {
                String key = value.getAreaId() + "_" + value.getRoadId() + "_" + value.getMonitorId();
                ReadOnlyBroadcastState<String, Double> broadcastState = ctx.getBroadcastState(stateDescriptor);
                if (broadcastState.contains(key)) {
                    double limitSpeed = broadcastState.get(key);
                    if (value.getSpeed() * 1.2 > limitSpeed) {
                        //超速车辆
                        out.collect(new OverSpeedCarInfo(value.getAreaId(), value.getRoadId(), value.getMonitorId(), value.getCardNo(), value.getSpeed(), limitSpeed, new Date()));
                    } else {
                        if (value.getSpeed() > 60 * 1.2) {
                            out.collect(new OverSpeedCarInfo(value.getAreaId(), value.getRoadId(), value.getMonitorId(), value.getCardNo(), value.getSpeed(), 60 * 1.2, new Date()));
                        }
                    }
                }

            }

            @Override
            public void processBroadcastElement(MonitorSpeedLimitInfo value, BroadcastProcessFunction<CardSpeedDTO, MonitorSpeedLimitInfo, OverSpeedCarInfo>.Context ctx, Collector<OverSpeedCarInfo> out) throws Exception {
                BroadcastState<String, Double> broadcastState = ctx.getBroadcastState(stateDescriptor);
                String key = value.getAreaId() + "_" + value.getRoadId() + "_" + value.getMonitorId();
                System.out.println("====================>>>>" + key);
                broadcastState.put(key, value.getSpeedLimit());
            }
        });
        process.addSink(new JdbcSink());
        //将结果写入mysql

        env.execute();


    }
}
